#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "limits.h"
#include "adt.h"
#include "ynet_rpc.h"
#include "sysy_lib.h"
#include "cluster.h"
#include "chunk.h"
#include "bmap.h"
#include "metadata.h"
#include "../replica/replica.h"
#include "net_table.h"
#include "configure.h"
#include "net_global.h"
#include "../storage/stor_rpc.h"
#include "../controller/stor_ctl.h"
#include "../controller/md_proto.h"
#include "md_map.h"
#include "md_root.h"
#include "md_parent.h"
#include "lich_md.h"
#include "ylog.h"
#include "dbg.h"

STATIC int __md_chunk_set(const nid_t *nid, const fileid_t *parent,
                          const chkid_t *chkid, const nid_t *_nid, int status)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_chunk_set(parent, chkid, _nid, status);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_chunk_set(nid, parent, chkid, _nid, status);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_chunk_set(const fileid_t *_parent, const chkid_t *chkid, const nid_t *_nid, int status)
{
        int ret, retry = 0;
        nid_t nid;
        fileid_t parent;

        (void) _parent;

        if (chkid->type == __RAW_CHUNK__) {
                cid2fid(&parent, chkid);
        } else if (chkid->type == __VOLUME_SUB_CHUNK__) {
                cid2fid(&parent, chkid);
                parent.type = __VOLUME_CHUNK__;
        } else if (chkid->type == __POOL_SUB_CHUNK__) {
                cid2fid(&parent, chkid);
                parent.type = __POOL_CHUNK__;
        } else {
                YASSERT(chkid->type == __VOLUME_CHUNK__ || chkid->type == __POOL_CHUNK__);
                parent = *chkid;
                parent.idx = 0;
        }

        YASSERT(!chkid_isroot(&parent));

retry:
        ret = md_map_getsrv(&parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_chunk_set(&nid, &parent, chkid, _nid, status);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(&parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

typedef struct {
        const chkid_t *parent;
        const chkinfo_t *chkinfo;
        const nid_t *owner;
        uint64_t info_version;
        task_t task;
} update_arg_t;

static void __md_chunk_update_local__(void *_arg)
{
        int ret;
        update_arg_t *arg;

        arg = _arg;

        ret = stor_ctl_chunk_update(arg->parent, arg->chkinfo, arg->owner, arg->info_version);

        DINFO("update "CHKID_FORMAT" localy ret:%d\n", CHKID_ARG(&arg->chkinfo->id), ret);

        schedule_resume(&arg->task, ret, NULL);
}

//avoid stack overlapping;
STATIC int __md_chunk_update_local(const chkid_t *parent,
                                   const chkinfo_t *chkinfo, const nid_t *owner, uint64_t info_version)
{
        int ret;
        update_arg_t arg;

        if (schedule_running()) {
                arg.parent = parent;
                arg.chkinfo = chkinfo;
                arg.owner = owner;
                arg.info_version = info_version;
                arg.task = schedule_task_get();

                schedule_task_new("chunk_update_local", __md_chunk_update_local__, &arg, -1);

                ret = schedule_yield("chunk_update_local", NULL, NULL);
                if (unlikely(ret)) {
                        DWARN("update "CHKID_FORMAT" localy fail %u\n",
                              CHKID_ARG(&arg.chkinfo->id), ret);
                        GOTO(err_ret, ret);
                }
        } else {
                ret = stor_ctl_chunk_update(parent, chkinfo, owner, info_version);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_chunk_update(const nid_t *nid, const chkid_t *parent,
                             const chkinfo_t *chkinfo, const nid_t *owner, uint64_t info_version)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = __md_chunk_update_local(parent, chkinfo, owner, info_version);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_chunk_update(nid, parent, chkinfo, owner, info_version);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

err_ret:
        return ret;
}

int md_chunk_update(const char *pool, const chkid_t *parent,
                    const chkinfo_t *chkinfo, const nid_t *owner, uint64_t info_version)
{
        int ret, retry = 0;
        nid_t nid;

        if (chkid_isroot(parent)) {
                // update etcd
                return md_root_chunk_update(pool, chkinfo, owner, info_version);
        }

retry:
        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_chunk_update(&nid, parent, chkinfo, owner, info_version);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        CHKINFO_DUMP(chkinfo, D_INFO);
        //YASSERT(chkid_cmp(parent, &chkinfo->id) == 0);

        if (chkinfo->id.type == __POOL_CHUNK__) {
                md_map_update(&chkinfo->id, &chkinfo->diskid[0].id);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_chunk_migrate____(const nid_t *nid, const fileid_t *parent,
                               const chkid_t *chkid, uint32_t force, chkinfo_t *chkinfo)
{
        int ret;

        //YASSERT(parent->type == __POOL_CHUNK__);
        if (net_islocalcall(nid)) {
                ret = stor_ctl_chunk_migrate(parent, chkid, force, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_chunk_migrate(nid, parent, chkid, force, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_chunk_migrate__(const fileid_t *parent, const chkinfo_t *chkinfo,
                               chkinfo_t *newinfo, uint32_t force)
{
        int ret, i;
        const nid_t *nid;

        for (i = 1; i < chkinfo->repnum; i++) {
                nid = &chkinfo->diskid[i].id;

                ret = __md_chunk_migrate____(nid, parent, &chkinfo->id, force, newinfo);
                if (unlikely(ret)) {
                        if (ret == EREMCHG) {
                                GOTO(err_ret, ret);
                        } else
                                continue;
                }

                break;
        }

        if (i == chkinfo->repnum) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_chunk_migrate(const char *pool, const fileid_t *parent, const chkid_t *chkid, int force,
                              chkinfo_t *_chkinfo)
{
        int ret;
        const fileid_t _parent;
        chkinfo_t *chkinfo;
        char buf[CHKINFO_MAX];

        (void) pool;

        if (parent == NULL) {
                ret = md_parent_get(chkid, (void *)&_parent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                parent = &_parent;
        }

        chkinfo = (void *)buf;
        CHKINFO_DUMP(_chkinfo, D_INFO);
        ret = __md_chunk_migrate__(parent, _chkinfo, chkinfo, force);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        CHKINFO_DUMP(chkinfo, D_INFO);
        YASSERT(chkid_cmp(chkid, &chkinfo->id) == 0);

        if (chkinfo->id.type == __POOL_CHUNK__) {
                md_map_update(&chkinfo->id, &chkinfo->diskid[0].id);
        }

        if (_chkinfo)
                memcpy(_chkinfo, chkinfo, CHKINFO_SIZE(chkinfo->repnum));

        return 0;
err_ret:
        return ret;
}

STATIC int __md_chunk_reject__(const nid_t *nid, const fileid_t *parent,
                       const chkid_t *chkid, const nid_t *bad, chkinfo_t *chkinfo)
{
        int ret;

        //YASSERT(parent->type == __POOL_CHUNK__);
        if (net_islocalcall(nid)) {
                ret = stor_ctl_chunk_reject(parent, chkid, bad, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_chunk_reject(nid, parent, chkid, bad, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_chunk_reject(const char *pool, const fileid_t *parent, const chkid_t *chkid, const nid_t *bad,
                    chkinfo_t *_chkinfo, nid_t *parentnid)
{
        int ret, retry = 0;
        nid_t nid;
        const fileid_t _parent;
        chkinfo_t *chkinfo;
        char buf[CHKINFO_MAX];

        chkinfo = (void *)buf;
        if (chkid_isroot(parent)) {
                return md_root_chunk_reject(pool, chkinfo, bad, parentnid);
        }

        if (parent == NULL) {
                ret = md_parent_get(chkid, (void *)&_parent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                parent = &_parent;
        }

retry:
        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_chunk_reject__(&nid, parent, chkid, bad, chkinfo);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        CHKINFO_DUMP(chkinfo, D_INFO);
        YASSERT(chkid_cmp(chkid, &chkinfo->id) == 0);

        if (chkinfo->id.type == __POOL_CHUNK__) {
                md_map_update(&chkinfo->id, &chkinfo->diskid[0].id);
        }

        if (_chkinfo)
                memcpy(_chkinfo, chkinfo, CHKINFO_SIZE(chkinfo->repnum));
        if (parentnid)
                *parentnid = nid;

        return 0;
err_ret:
        return ret;
}

int md_chunk_online(const char *pool, const fileid_t *parent, chkinfo_t *chkinfo)
{
        int ret;

        if (unlikely(chkinfo->id.type != __POOL_CHUNK__  && chkinfo->id.type != __VOLUME_CHUNK__))
                goto out;

        ret = network_connect(&chkinfo->diskid[0].id, NULL, 1, 0);
        if (unlikely(ret == 0))
                goto out;

        DBUG("chunk "CHKID_FORMAT" need new master\n", CHKID_ARG(&chkinfo->id));

        ret = __md_chunk_migrate(pool, parent, &chkinfo->id, 0, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

out:
        return 0;
err_ret:
        return ret;
}

/**
 *
 * @param nid
 * @param srv
 * @param chkid
 * @param dist
 * @param dist_count
 * @return
 */
int md_chunk_move(const nid_t *nid, const fileid_t *srv, const chkid_t *chkid, const nid_t *dist, int dist_count)
{
        int ret, retry = 0;

        metadata_check_diskid(dist, dist_count);
retry:
        if (net_islocalcall(nid)) {
                ret = stor_ctl_chunk_move(srv, chkid, dist, dist_count);
        } else {
                ret = stor_rpc_chunk_move(nid, srv, chkid, dist, dist_count);
        }

        ret = _errno_net(ret);
        if (unlikely(ret)) {
                if (ret == EAGAIN || ret == ENOSPC) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 5, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

/**
 *
 * @param nid   controller nid
 * @param srv   controller chkid
 * @param chkid chunk of controller
 * @param idx
 * @param async
 * @return
 */
int md_chunk_check(const nid_t *nid, const fileid_t *srv,
                   const chkid_t *chkid, int idx, int async, int force, int *oflags)
{
        int ret, retry = 0;

retry:
        if (net_islocalcall(nid)) {
                ret = stor_ctl_chunk_check(srv, chkid, idx, async, force, oflags);
        } else {
                ret = stor_rpc_chunk_check(nid, srv, chkid, idx, async, force, oflags);
        }

        ret = _errno_net(ret);
        if (unlikely(ret)) {
                if (ret == EAGAIN || ret == ENOSPC) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 5, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_chunk_allocate(const nid_t *nid, const fileid_t *fileid,
                const chkid_t *chkid, int chknum, int fill)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_chunk_allocate(fileid, chkid, chknum, fill);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_chunk_allocate(nid, fileid, chkid, chknum, fill);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

/** allocate fileid/chkid, get controller from fileid
 *
 * @param fileid
 * @param chkid
 * @param fill fill with 0
 * @param parentnid
 * @return
 */
int md_chunk_allocate(const fileid_t *fileid, const chkid_t *chkid,
                int chknum, int fill, nid_t *parentnid)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_chunk_allocate(&nid, fileid, chkid, chknum, fill);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        if (parentnid)
                *parentnid = nid;

        return 0;
err_ret:
        return ret;
}

typedef struct {
        const chkid_t *parent;
        chkinfo_t *chkinfo;
        const chkid_t *chkid;
        task_t task;
} getinfo_arg_t;

STATIC void __md_chunk_getinfo_local__(void *_arg)
{
        int ret;
        getinfo_arg_t *arg;

        arg = _arg;

        ret = stor_ctl_chunk_getinfo(arg->parent, arg->chkid, arg->chkinfo);

        DBUG("getinfo "CHKID_FORMAT" from "CHKID_FORMAT" localy ret:%d\n",
              CHKID_ARG(arg->chkid), CHKID_ARG(arg->parent), ret);

        schedule_resume(&arg->task, ret, NULL);
}

//avoid stack overlapping;
STATIC int __md_chunk_getinfo_local(const chkid_t *parent, const chkid_t *chkid,
                                    chkinfo_t *chkinfo)
{
        int ret;
        getinfo_arg_t arg;

        if (schedule_running()) {
                arg.parent = parent;
                arg.chkinfo = chkinfo;
                arg.chkid = chkid;
                arg.task = schedule_task_get();

                schedule_task_new("chunk_getinfo_local", __md_chunk_getinfo_local__, &arg, -1);

                ret = schedule_yield("chunk_getinfo_local", NULL, NULL);
                if (unlikely(ret)) {
                        DWARN("getinfo "CHKID_FORMAT" localy fail %u\n",
                              CHKID_ARG(chkid), ret);
                        GOTO(err_ret, ret);
                }
        } else {
                ret = stor_ctl_chunk_getinfo(parent, chkid, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_chunk_getinfo(const nid_t *nid, const fileid_t *parent,
                              const fileid_t *chkid, chkinfo_t *chkinfo)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = __md_chunk_getinfo_local(parent, chkid, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_chunk_getinfo(nid, parent, chkid, chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        YASSERT(chkid_cmp(chkid, &chkinfo->id) == 0);

        return 0;
err_ret:
        return ret;
}

/**
 * @note 如控制器离线, 潜在地触发控制器切换过程
 *
 * @param pool
 * @param parent
 * @param chkid
 * @param chkinfo
 * @param parentnid
 * @return
 */
int md_chunk_getinfo(const char *pool, const fileid_t *parent, const fileid_t *chkid,
                     chkinfo_t *chkinfo, nid_t *parentnid)
{
        int ret, retry = 0;
        nid_t nid;
        fileid_t _parent;

        if (parent == NULL) {
                ret = md_parent_get(chkid, &_parent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                parent = &_parent;
        }

        if (chkid_isroot(parent)) {
                return md_root_lookup(pool, parent, chkid, chkinfo, parentnid);
        }

retry:
        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_chunk_getinfo(&nid, parent, chkid, chkinfo);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        ret = md_chunk_online(pool, parent, chkinfo);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        //md_map_drop(parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        YASSERT(chkid_cmp(chkid, &chkinfo->id) == 0);

        md_map_update(&chkinfo->id, &chkinfo->diskid[0].id);

        if (parentnid)
                *parentnid = nid;

        return 0;
err_ret:
        if (chkid->type == __POOL_CHUNK__ || chkid->type == __VOLUME_CHUNK__ ) {
                ret = (ret == ENOKEY) ? ENOENT : ret;
        }
        return ret;
}

int md_chunk_getinfo1(const char *pool, const fileid_t *parent, const fileid_t *chkid,
                      chkinfo_t *chkinfo, nid_t *parentnid)
{
        int ret, retry = 0;
        nid_t nid;
        fileid_t _parent;

        if (parent == NULL) {
                ret = md_parent_get(chkid, &_parent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                parent = &_parent;
        }

        if (chkid_isroot(parent)) {
                return md_root_lookup1(pool, parent, chkid, chkinfo, parentnid);
        }
retry:
        ret = md_map_getsrv(parent, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_chunk_getinfo(&nid, parent, chkid, chkinfo);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(parent, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        // CHKINFO_DUMP(chkinfo, D_INFO);

        md_map_update(&chkinfo->id, &chkinfo->diskid[0].id);

        if (parentnid)
                *parentnid = nid;

        return 0;
err_ret:
        return ret;
}

STATIC int __md_chunk_table_read(const nid_t *nid, const fileid_t *fileid, const chkid_t *chkid,
                buffer_t *buf, uint32_t size, uint64_t offset)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_table_read(fileid, chkid, buf, size, offset);
                if (ret)
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_table_read(nid, fileid, chkid, buf, size, offset);
                if (ret)
                        GOTO(err_ret, ret);
        }

err_ret:
        return ret;
}

int md_chunk_table_read(const fileid_t *fileid, const chkid_t *chkid, buffer_t *buf, uint32_t size, uint64_t offset)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_chunk_table_read(&nid, fileid, chkid, buf, size, offset);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_chunk_table_write(const nid_t *nid, const fileid_t *fileid, const chkid_t *chkid,
                const buffer_t *buf, uint32_t size, uint64_t offset)
{
        int ret;

        if (net_islocalcall(nid)) {
                ret = stor_ctl_table_write(fileid, chkid, buf, size, offset);
                if (ret)
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_table_write(nid, fileid, chkid, buf, size, offset);
                if (ret)
                        GOTO(err_ret, ret);
        }

err_ret:
        return ret;
}

int md_chunk_table_write(const fileid_t *fileid, const chkid_t *chkid, const buffer_t *buf, uint32_t size, uint64_t offset)
{
        int ret, retry = 0;
        nid_t nid;

retry:
        ret = md_map_getsrv(fileid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __md_chunk_table_write(&nid, fileid, chkid, buf, size, offset);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(fileid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

/**
 *
 * @param path IN
 * @param _chkid OUT
 * @return
 */
int md_chunk_getid(const char *pool, const char *path, chkid_t *_chkid)
{
        int ret, retry = 0;

        if (path[0] == '/') {
        retry:
                ret = stor_lookup1x(pool, path, _chkid);
                if (ret) {
                        if (ret == EAGAIN) {
                                USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                        } else
                                GOTO(err_ret, ret);
                }
        } else {
                str2chkid(_chkid, path);
                if (!chkid_isvalid(_chkid)) {
                        ret = EINVAL;
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

int md_chunk_getsite_byinfo(const char *pool, const fileid_t *chkid, char *site_name)
{
        int ret;
        chkinfo_t *chkinfo;
        char buf[CHKINFO_MAX];
        chkinfo = (void *)buf;
        const char *tmpname;
        char _rack[MAX_NAME_LEN], _node[MAX_NAME_LEN], _disk[MAX_NAME_LEN],
                _site[MAX_NAME_LEN];

        strcpy(site_name, "");
        ret = md_chunk_getinfo(pool, NULL, chkid, chkinfo, NULL);
        if (ret)
                GOTO(err_ret, ret);

        tmpname = network_rname(&chkinfo->diskid[0].id);
        hosts_split(tmpname, _site, _rack, _node, _disk);
        strcpy(site_name, _site);

        return 0;
err_ret:
        return ret;
}

int __md_chunk_getsite(const char *pool, const fileid_t *chkid, char *site_name)
{
        int ret, buflen;
        nid_t nid;
        char buf[MAX_NAME_LEN];

        ret = md_map_getsrv(chkid, &nid);
        if (ret)
                GOTO(err_ret, ret);

        buflen = MAX_NAME_LEN;
        ret = md_xattr_get(&nid, chkid, STORAGE_AREA_KEY, buf, &buflen);
        if (ret) {
                if (ret == ENOKEY) {
                        ret = md_chunk_getsite_byinfo(pool, chkid, site_name);
                        if (ret)
                                GOTO(err_ret, ret);
                } else {
                        GOTO(err_ret, ret);
                }
        } else {
                strcpy(site_name, buf);
        }

        return 0;
err_ret:
        return ret;
}

int md_chunk_getsite_bylocal(char *site_name)
{
        nid_t local;
        const char *tmpname;
        char _rack[MAX_NAME_LEN], _node[MAX_NAME_LEN], _disk[MAX_NAME_LEN],
                _site[MAX_NAME_LEN];

        local = *net_getnid();
        tmpname = network_rname(&local);
        hosts_split(tmpname, _site, _rack, _node, _disk);
        strcpy(site_name, _site);

        return 0;
}

int md_chunk_getsite(const char *pool, const fileid_t *chkid, char *site_name)
{
        int ret, retry = 0;

retry:
        ret = __md_chunk_getsite(pool, chkid, site_name);
        if (ret) {
                USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
        }

        return 0;
err_ret:
        return ret;
}

int __md_chunk_setsite(const fileid_t *chkid, const char *site_name)
{
        int ret;

        ret = md_xattr_set(NULL, chkid, STORAGE_AREA_KEY, site_name, strlen(site_name), 0);
        if (ret)
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int md_chunk_setsite(const fileid_t *chkid, const char *site_name)
{
        int ret, retry = 0;

retry:
        ret = __md_chunk_setsite(chkid, site_name);
        if (ret) {
                USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
        }

        return 0;
err_ret:
        return ret;
}

int md_chunk_setsite_path(const char *pool, const char *path, const char *_site)
{
        int ret;
        chkid_t chkid;

        ret = md_chunk_getid(pool, path, &chkid);
        if (ret)
                GOTO(err_ret, ret);

        ret = md_chunk_setsite(&chkid, _site);
        if (ret)
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int md_vfm_set(const volid_t *volid, const chkid_t *chkid, const vfm_t *vfm)
{
        int ret, retry = 0;
        nid_t nid;

        YASSERT(volid->type == __VOLUME_CHUNK__);

retry:
        ret = md_map_getsrv(volid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = stor_rpc_vfm_set(&nid, volid, chkid, vfm);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(volid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_vfm_get(const volid_t *volid, const chkid_t *chkid, vfm_t *vfm)
{
        int ret, retry = 0;
        nid_t nid;

        YASSERT(volid->type == __VOLUME_CHUNK__);

retry:
        ret = md_map_getsrv(volid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = stor_rpc_vfm_get(&nid, volid, chkid, vfm);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(volid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int md_vfm_stat(const volid_t *volid, int *count)
{
        int ret, retry = 0;
        nid_t nid;

        YASSERT(volid->type == __VOLUME_CHUNK__);

retry:
        ret = md_map_getsrv(volid, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = stor_rpc_vfm_stat(&nid, volid, count);
        if (unlikely(ret)) {
                if ((ret == EREMCHG || ret == ETIMEDOUT) && retry == 0) {
                        md_map_drop(volid, &nid);
                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}
